Kinesis FirehoseのS3連携をKinesis StreamとLambdaで実装する #reinvent
re:invent 2015 で発表された Kinesis Firehose を使うと、管理画面の操作だけで Kinesis のデータを S3/Redshift に連携します。 Kiensis Firehose を使うとサーバーレスになる上、Kinesis Stream のシャードやイテレーターなどの面倒な操作から開発者を解放してくれます。
ただ残念なことに
- US East (N. Virginia)
- US West (Oregon)
- EU (Ireland)
のリージョンにしか対応しておらず、東京リージョンではまだ使えません。
仕方がないので Kinesis Stream と Lambda で同等の機能を実装してみました。
Kinesis Firehose の S3 連携について
Kinesis Firehose の設定画面は下図のようになっています。
- 連携先のバケット名(S3 bucket)
- S3のキーのプリフィックス(S3 prefix)
を指定すると、S3 バケットに次のように連携されます。
. └── S3Prefix2015 └── 10 └── 16 ├── 22 │ ├── StreamName-1-2015-10-16-22-52-25-ea7c6625-b9d1-413b-ac23-563825c0f8b7 │ └── StreamName-1-2015-10-16-22-57-26-1f89f472-28da-47ac-a6a5-362e0250e6db └── 23 └── StreamName-4-2015-10-16-23-02-27-ed8fc4d7-f7cd-42e4-8417-dca6602f0b70
S3 オブジェクトのキー全体は
S3Prefix2015/10/16/22/StreamName-1-2015-10-16-22-52-25-ea7c6625-b9d1-413b-ac23-563825c0f8b7
というようになっており、
{S3 Prefix}YYYY/MM/DD/HH/{Firehose Stream Name}-{Shard Id?}-YYYY-MM-DD-HH-MM-SS-FF-{Hash}
というような形をしています。
- Shard Id? と思しき箇所は正確には何が使われているのかは不明です。
- Hash で何が使われているのかは不明です。
Lambda の Kinesis -> S3 連携について
実装方針
今回は Firehose の S3 連携処理を、データソースを Kinesis Stream にして Lambda を使って実現します。
- 連携先S3バケット名
- S3 Prefix
- Firehose Stream Name
- Shard Id
は Lambda 関数内で決め打ちし、Hash 関数には uuid v4 を採用します。
Lambda 関数について
Lambda のランタイムには re:invent 2015 ぽさを醸し出すために Python を使います。
Lambda 関数に Kinesis Stream のブループリントがすでに存在するため
- S3 キーの生成
- S3 への Put
を追加したのが以下です。
# vim: set fileencoding=utf8 : import base64 import datetime import uuid import boto3 S3_BUCKET = 'YOUR_BUCKET_NAME' S3_PREFIX = 'prefix_test' STREAM_NAME = 'Stream_Name' SHARD_ID = 1 LINE_TERMINATOR = '\r\n' def get_s3_key(): today = datetime.datetime.utcnow() return "{}{}{}-{}-{}-{}".format( S3_PREFIX, today.strftime('%Y/%m/%d/%H/'), STREAM_NAME, SHARD_ID, today.strftime('%Y-%m-%d-%H-%S-%f')[:-4], uuid.uuid4()) def decode(payload): # TODO # e.g.) string to JSON return payload def process(payload): # TODO # logic comes here return payload def put_to_s3(body): s3_client = boto3.client('s3') s3_client.put_object(Body=body, Bucket=S3_BUCKET, Key=get_s3_key()) def lambda_handler(event, context): buff = [] for record in event['Records']: #Kinesis data is base64 encoded so decode here payload=base64.b64decode(record["kinesis"]["data"]) print "Decoded payload: " + payload buff.append(process(decode(payload))) put_to_s3(LINE_TERMINATOR.join(buff)) return 'Successfully processed {} records.'.format(len(event['Records']))
Kinesis Firehose と同じく、Kinesis のデータを加工せずに S3 に PutObject しているだけですが
- 文字列を他のフォーマット(JSONなど)に変換する関数(decode)
- データ処理する関数(process)
も用意しています。
このLambda関数の登録方法はドキュメントを参照ください。
利用しているモジュールのうち boto3 だけは標準ライブラリではありません。しかし、boto3 は Lambda の Python ランタイムにはインストール済みのため、boto3 モジュールの登録は不要です。
S3 連携を確認
Kinesis Stream にレコード追加
CLI を利用して Kinesis Stream にレコード追加します。
$ aws kinesis put-record --stream-name StreamName --partition-key 1 --data 1,2,3 { "ShardId": "shardId-000000000000", "SequenceNumber": "49555444218629747788391510344658619821542396257728724994" } $ aws kinesis put-record --stream-name StreamName --partition-key 2 --data 'hello world!' { "ShardId": "shardId-000000000000", "SequenceNumber": "49555444218629747788391510344659828747362011642817675266" }
S3 バケットの確認
連携先 S3 バケットをローカルに持ってきて中身を確認します。
$ aws s3 sync s3://BUCKET_NAME BUCKET_NAME $ tree BUCKET_NAME BUCKET_NAME └── prefix_test2015 └── 10 └── 17 └── 00 ├── StreamName-1-2015-10-17-00-01-99-bb8faeb8-8ae5-4e54-87b1-b0d00c773f5b └── StreamName-1-2015-10-17-00-11-67-50b7087e-6e57-40bc-a99d-a4e2277e70b2 4 directories, 2 files
ファイルの中身も確認してみましょう。
$ cat BUCKET_NAME/prefix_test2015/10/17/00/StreamName-1-2015-10-17-00-01-99-bb8faeb8-8ae5-4e54-87b1-b0d00c773f5b 1,2,3 $ cat BUCKET_NAME/prefix_test2015/10/17/00/StreamName-1-2015-10-17-00-11-67-50b7087e-6e57-40bc-a99d-a4e2277e70b2 hello world!
Kinesis に Put したデータが無事 S3 に連携されていますね。
まとめ
この程度の処理であれば Lambda 関数を書くのもそれほど手間ではありませんが、さっさと Kinesis Firehose が東京リージョンにやってきて、画面ポチだけで S3 連携されるようになって欲しいですね!